package my.net.server.service;

import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import my.net.bean.Message;
import my.util.ClassFilter;
import my.util.ClassUtil;
import my.util.PackageScan;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushServer {
	private static final Logger logger = LoggerFactory
			.getLogger(PushServer.class);
	/**
	 * userid -> Channel mapping
	 */
	private static ConcurrentHashMap<String, Channel> allChannels = new ConcurrentHashMap<String, Channel>();

	private static ConcurrentHashMap<String, Processor> processors = new ConcurrentHashMap<String, Processor>();

	private static ConcurrentHashMap<String, Processor> noLoginProcessors = new ConcurrentHashMap<String, Processor>();
	
	
	private static ScheduledExecutorService service;
	
	static{
		/**
		 * 30
		 */
		service =Executors.newScheduledThreadPool(1);
		service.scheduleWithFixedDelay(new Runnable() {
 			
 			@Override
 			public void run() {
 				Message msg = new Message();
 				msg.setCmd("ping");
 				System.out.println("send ping");
 				PushServer.broadcast(msg);
 			}
 		}, 30, 30, TimeUnit.MINUTES);
	}

	public static void process(Message msg, ChannelHandlerContext ctx,
			MessageEvent e) {
		String cmd = msg.getCmd();

		Processor p = noLoginProcessors.get(cmd);
		Message ret = null;
		if (p != null) {
			ret = p.process(msg, e.getChannel());
		} else {
			String userid = (String) e.getChannel().getAttachment();

			if (userid == null) {
				logger.error("no login , close channel");
				e.getChannel().close();
			} else {
				if (allChannels.containsKey(userid)) {
					p = processors.get(cmd);
					if (p != null) {
						ret = p.process(msg, e.getChannel());
					}
				}
			}
		}

		if (ret != null) {
			logger.debug("ret:{}", ret.toJson());
			e.getChannel().write(ret.toJson());
		}
	}

	/**
	 * 注册某一个包下的processor，支持多个包;分隔
	 * 
	 * @param packages
	 */
	public static void register(String packages) {
		String[] packs = packages.split(";");
		Set<Class<?>> classes = new HashSet<Class<?>>();

		ClassFilter filter = new ClassFilter() {

			@Override
			public boolean accept(Class<?> cls) {

				return ClassUtil.isInterface(cls, Processor.class);
			}
		};

		for (String p : packs) {
			classes.addAll(PackageScan.getClasses(p, filter));
		}

		for (Class<?> cls : classes) {
			@SuppressWarnings("unchecked")
			Class<Processor> load = (Class<Processor>) cls;
			if (!Modifier.isAbstract(load.getModifiers())) {
				try {
					register(load.newInstance());
				} catch (InstantiationException e) {
					logger.error("can not register {}", cls.getName());
				} catch (IllegalAccessException e) {
					logger.error("can not register {}", cls.getName());
				}
			}
		}
	}

	/**
	 * 注册处理器
	 * 
	 * @param processor
	 */
	public static void register(Processor processor) {
		logger.debug("register processor {} need login:{}", processor.name(),
				processor.needLogin());
		if (processor.needLogin()) {
			processors.put(processor.name(), processor);
		} else {
			noLoginProcessors.put(processor.name(), processor);
		}
	}

	public static void login(String userid, Channel channel) {
		channel.setAttachment(userid);

		if (allChannels.get(userid) != null) {
			if (allChannels.get(userid).getId() != channel.getId()) {
				logger.warn("同一个userid ,不同的连接,close old . ");
				close(allChannels.get(userid));
			}
		}

		allChannels.put(userid, channel);
	}

	public static void loginOut(String userid) {
		close(allChannels.get(userid));
	}

	/**
	 * 广播不关心成功或失败
	 * 
	 * @param msg
	 * @return
	 */
	public static void broadcast(Message msg) {
		broadcast(msg, new OpListenerAdapter());
	}

	public static void broadcast(Message msg, final OpListener listener) {
		final String json = msg.toJson();
		for (Map.Entry<String, Channel> entry : allChannels.entrySet()) {
			ChannelFuture future = entry.getValue().write(json);
			final String userid = entry.getKey();
			future.addListener(new ChannelFutureListener() {

				@Override
				public void operationComplete(ChannelFuture future)
						throws Exception {
					if (future.isSuccess()) {
						listener.onOk(userid);
					} else if (future.isCancelled()) {
						listener.onCancel(userid);
					} else if (!future.isSuccess()) {
						logger.error("error", future.getCause());
						close(future.getChannel());
						listener.onError(userid);
					}
				}
			});
		}
	}

	public static Iterator<String> userIds() {
		return allChannels.keySet().iterator();
	}

	/**
	 * 直接发送，不关系是否失败
	 * 
	 * @param userid
	 * @param msg
	 */
	public static void sendMsg(String userid, Message msg) {
		sendMsg(userid, msg, new OpListenerAdapter());
	}

	/**
	 * 发送消息，监听是否成功
	 * 
	 * @param userid
	 * @param msg
	 * @param listener
	 */
	public static void sendMsg(final String userid, Message msg,
			final OpListener listener) {
		Channel c = allChannels.get(userid);
		if (c != null) {
			ChannelFuture future = c.write(msg.toJson());
			future.addListener(new ChannelFutureListener() {

				@Override
				public void operationComplete(ChannelFuture future)
						throws Exception {
					if (future.isSuccess()) {
						listener.onOk(userid);
					} else if (future.isCancelled()) {
						listener.onCancel(userid);
					} else if (!future.isSuccess()) {
						logger.error("error", future.getCause());
						close(future.getChannel());
						listener.onError(userid);
					}
				}
			});
		} else {
			logger.error("error,channel is null", new RuntimeException());
			listener.onError(userid);
		}
	}

	public static void close(Channel channel) {
		logger.debug("close channel:" + channel);
		if (channel == null) {
			return;
		}
		String userid = (String) channel.getAttachment();
		if (userid != null) {
			allChannels.remove(userid);
		}
		channel.close();
	}

	public static void exceptionCaught(ExceptionEvent e) {
		logger.error("error", e.getCause());
		close(e.getChannel());
	}

	public static void channelDisconnected(ChannelHandlerContext ctx,
			ChannelStateEvent e) {
		close(e.getChannel());
	}

}
